Course: MTech (IS)
Project Owners:
Sudalaiandi Raja Sudalaimuthu
Jayaram Revathi
Ananconda Promptconda config --append channels conda-forgehelmetconda create -n helmet python=3.6 numpy=1.15.1 opencv=3.4.2 matplotlib=2.2.3 tensorflow-gpu=1.12.0 scipy=1.1.0 scikit-learn=0.19.1 spyder=3.3.2 yaml=0.1.7 keras-gpu=2.1.6 pillow=5.4.1 notebook=5.7.4 pandas=0.24.2 h5py=2.8.0 cython Pillow scikit-image imgaug IPython[all] tqdm
conda activate helmet
pip install git+https://github.com/philferriere/cocoapi.git#egg=pycocotools^&subdirectory=PythonAPI
git clone https://github.com/aivoyagers/Helmet_Mask_RCNN.git
cd Helmet_Mask_RCNN
python setup.py install
https://drive.google.com/drive/folders/1VHDlu76J1bScRBSy-4WsWI_mldf_Pf-b?usp=sharing
jupyter notebook
Update 'COMMAND' to 'train' if fresh training on downloaded helmet 'train' & 'val' datasets. For test or inference, 'val' datasets required
Execute helmet_detect.ipynb
import os
import sys
import random
import math
import re
import time
import numpy as np
import tensorflow as tf
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as patches
# Root directory of the project
ROOT_DIR = os.path.abspath("./")
# Import Mask RCNN
sys.path.append(ROOT_DIR) # To find local version of the library
from mrcnn import utils
from mrcnn import visualize
from mrcnn.visualize import display_images
import mrcnn.model as modellib
from mrcnn.model import log
import helmet
from helmet import HelmetConfig
from helmet import train
from helmet import detect_and_color_splash
%matplotlib inline
# Directory to save logs and trained model
MODEL_DIR = os.path.join(ROOT_DIR, "logs")
# Path to trained weights file
COCO_WEIGHTS_PATH = os.path.join(ROOT_DIR, "mask_rcnn_coco.h5")
COMMAND = 'inference' # Update to 'train' or 'inference'
HELMET_DIR = os.path.join(ROOT_DIR, "datasets/helmet") # Update - Required for'train'
IMAGE_FILE = os.path.join(ROOT_DIR, "datasets/helmet/va/VID_20200315_181352_frame0.jpg") # Update - Required for 'inference'
VIDEO_FILE = None # either IMAGE_FILE or VIDEO_FILE required
# Update to 'coco' or 'last' - use the last generated weights or specify the weights file path
WEIGHTS = 'coco'
# WEIGHTS = 'last'
# WEIGHTS = os.path.join(ROOT_DIR, "mask_rcnn_helmet.h5")
# Set to False if needs to be 'Motorcyclist_with_Helmet' or 'without_Helmet' instead
helmet.DEFAULT_LABEL_HELMET_SEPARATE = True
helmet.NBR_OF_EPOCHS = 100
# Configurations
class InferenceConfig(HelmetConfig):
# Set batch size to 1 since we'll be running inference on
# one image at a time. Batch size = GPU_COUNT * IMAGES_PER_GPU
GPU_COUNT = 1
IMAGES_PER_GPU = 1
if COMMAND == "train":
config = HelmetConfig()
helmet.config = config
config.display()
# else:
# config = InferenceConfig()
# Create model
if COMMAND == "train":
model = modellib.MaskRCNN(mode="training", config=config,
model_dir=MODEL_DIR)
print ('Model loaded for training')
# else:
# model = modellib.MaskRCNN(mode="inference", config=config,
# model_dir=MODEL_DIR)
# print ('Model loaded for inference')
if COMMAND == 'train' :
# Select weights file to load
if WEIGHTS == "coco":
weights_path = COCO_WEIGHTS_PATH
# Download weights file
if not os.path.exists(weights_path):
utils.download_trained_weights(weights_path)
elif WEIGHTS == "last":
# Find last trained weights
weights_path = model.find_last()
elif WEIGHTS == "imagenet":
# Start from ImageNet trained weights
weights_path = model.get_imagenet_weights()
else:
weights_path = WEIGHTS
print (f'Weights file selected for {WEIGHTS}')
# Load weights
print("Loading weights ", weights_path)
if WEIGHTS == "coco":
# Exclude the last layers because they require a matching
# number of classes
model.load_weights(weights_path, by_name=True, exclude=[
"mrcnn_class_logits", "mrcnn_bbox_fc",
"mrcnn_bbox", "mrcnn_mask"])
else:
model.load_weights(weights_path, by_name=True)
print(f'{weights_path} weights loaded')
# Train or evaluate
if COMMAND == "train":
train(model, HELMET_DIR)
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.patches as patches
%matplotlib inline
def get_ax(rows=1, cols=1, size=16):
"""Return a Matplotlib Axes array to be used in
all visualizations in the notebook. Provide a
central point to control graph sizes.
Adjust the size attribute to control how big to render images
"""
_, ax = plt.subplots(rows, cols, figsize=(size*cols, size*rows))
return ax
# Set to inference configuration
config = InferenceConfig()
helmet.config = config
# Load model in inference mode
model = modellib.MaskRCNN(mode="inference", config=config,
model_dir=MODEL_DIR)
# Find last trained weights
weights_path = model.find_last()
# Load weights
model.load_weights(weights_path, by_name=True)
print(f'{weights_path} weights loaded')
# Load validation dataset
dataset = helmet.HelmetDataset()
dataset.load_helmet(HELMET_DIR, "val")
# Must call before using the dataset
dataset.prepare()
print("Images: {}\nClasses: {}".format(len(dataset.image_ids), dataset.class_names))
#image_id = random.choice(dataset.image_ids)
image_id = [i for i,_ in enumerate(dataset.image_info) if _['id'] == 'VID_20200315_181352_frame0.jpg'][0]
image, image_meta, gt_class_id, gt_bbox, gt_mask =\
modellib.load_image_gt(dataset, config, image_id, use_mini_mask=False)
info = dataset.image_info[image_id]
print("image ID: {}.{} ({}) {}".format(info["source"], info["id"], image_id,
dataset.image_reference(image_id)))
# Run object detection
results = model.detect([image], verbose=1)
# Display results
ax = get_ax(1)
r = results[0]
visualize.display_instances(image, r['rois'], r['masks'], r['class_ids'],
dataset.class_names, r['scores'], ax=ax,
title="Predictions")
log("gt_class_id", gt_class_id)
log("gt_bbox", gt_bbox)
log("gt_mask", gt_mask)
This is for illustration. You can call helmet.py with the splash option to get better images without the black padding.
splash = helmet.color_splash(image, r['masks'])
display_images([splash], cols=1)
# Draw precision-recall curve
AP, precisions, recalls, overlaps = utils.compute_ap(gt_bbox, gt_class_id, gt_mask,
r['rois'], r['class_ids'], r['scores'], r['masks'])
visualize.plot_precision_recall(AP, precisions, recalls)
# Grid of ground truth objects and their predictions
visualize.plot_overlaps(gt_class_id, r['class_ids'], r['scores'],
overlaps, dataset.class_names)
# Compute VOC-style Average Precision
def compute_batch_ap(image_ids):
APs = []
for image_id in image_ids:
# Load image
image, image_meta, gt_class_id, gt_bbox, gt_mask =\
modellib.load_image_gt(dataset, config,
image_id, use_mini_mask=False)
# Run object detection
results = model.detect([image], verbose=0)
# Compute AP
r = results[0]
AP, precisions, recalls, overlaps =\
utils.compute_ap(gt_bbox, gt_class_id, gt_mask,
r['rois'], r['class_ids'], r['scores'], r['masks'])
APs.append(AP)
return APs
# Pick a set of random images
image_ids = np.random.choice(dataset.image_ids, 10)
APs = compute_batch_ap(image_ids)
print("mAP @ IoU=50: ", np.mean(APs))
The Region Proposal Network (RPN) runs a lightweight binary classifier on a lot of boxes (anchors) over the image and returns object/no-object scores. Anchors with high objectness score (positive anchors) are passed to the stage two to be classified.
Often, even positive anchors don't cover objects fully. So the RPN also regresses a refinement (a delta in location and size) to be applied to the anchors to shift it and resize it a bit to the correct boundaries of the object.
The RPN targets are the training values for the RPN. To generate the targets, we start with a grid of anchors that cover the full image at different scales, and then we compute the IoU of the anchors with ground truth object. Positive anchors are those that have an IoU >= 0.7 with any ground truth object, and negative anchors are those that don't cover any object by more than 0.3 IoU. Anchors in between (i.e. cover an object by IoU >= 0.3 but < 0.7) are considered neutral and excluded from training.
To train the RPN regressor, we also compute the shift and resizing needed to make the anchor cover the ground truth object completely.
# Generate RPN trainig targets
# target_rpn_match is 1 for positive anchors, -1 for negative anchors
# and 0 for neutral anchors.
target_rpn_match, target_rpn_bbox = modellib.build_rpn_targets(
image.shape, model.anchors, gt_class_id, gt_bbox, model.config)
log("target_rpn_match", target_rpn_match)
log("target_rpn_bbox", target_rpn_bbox)
positive_anchor_ix = np.where(target_rpn_match[:] == 1)[0]
negative_anchor_ix = np.where(target_rpn_match[:] == -1)[0]
neutral_anchor_ix = np.where(target_rpn_match[:] == 0)[0]
positive_anchors = model.anchors[positive_anchor_ix]
negative_anchors = model.anchors[negative_anchor_ix]
neutral_anchors = model.anchors[neutral_anchor_ix]
log("positive_anchors", positive_anchors)
log("negative_anchors", negative_anchors)
log("neutral anchors", neutral_anchors)
# Apply refinement deltas to positive anchors
refined_anchors = utils.apply_box_deltas(
positive_anchors,
target_rpn_bbox[:positive_anchors.shape[0]] * model.config.RPN_BBOX_STD_DEV)
log("refined_anchors", refined_anchors, )
# Display positive anchors before refinement (dotted) and
# after refinement (solid).
visualize.draw_boxes(image, boxes=positive_anchors, refined_boxes=refined_anchors, ax=get_ax())
This stage takes the region proposals from the RPN and classifies them.
Run the classifier heads on proposals to generate class propbabilities and bounding box regressions.
# Get input and output to classifier and mask heads.
mrcnn = model.run_graph([image], [
("proposals", model.keras_model.get_layer("ROI").output),
("probs", model.keras_model.get_layer("mrcnn_class").output),
("deltas", model.keras_model.get_layer("mrcnn_bbox").output),
("masks", model.keras_model.get_layer("mrcnn_mask").output),
("detections", model.keras_model.get_layer("mrcnn_detection").output),
])
# Get detection class IDs. Trim zero padding.
det_class_ids = mrcnn['detections'][0, :, 4].astype(np.int32)
det_count = np.where(det_class_ids == 0)[0][0]
det_class_ids = det_class_ids[:det_count]
detections = mrcnn['detections'][0, :det_count]
print("{} detections: {}".format(
det_count, np.array(dataset.class_names)[det_class_ids]))
captions = ["{} {:.3f}".format(dataset.class_names[int(c)], s) if c > 0 else ""
for c, s in zip(detections[:, 4], detections[:, 5])]
visualize.draw_boxes(
image,
refined_boxes=utils.denorm_boxes(detections[:, :4], image.shape[:2]),
visibilities=[2] * len(detections),
captions=captions, title="Detections",
ax=get_ax())
Here we dive deeper into the process of processing the detections.
# Proposals are in normalized coordinates. Scale them
# to image coordinates.
h, w = config.IMAGE_SHAPE[:2]
proposals = np.around(mrcnn["proposals"][0] * np.array([h, w, h, w])).astype(np.int32)
# Class ID, score, and mask per proposal
roi_class_ids = np.argmax(mrcnn["probs"][0], axis=1)
roi_scores = mrcnn["probs"][0, np.arange(roi_class_ids.shape[0]), roi_class_ids]
roi_class_names = np.array(dataset.class_names)[roi_class_ids]
roi_positive_ixs = np.where(roi_class_ids > 0)[0]
# How many ROIs vs empty rows?
print("{} Valid proposals out of {}".format(np.sum(np.any(proposals, axis=1)), proposals.shape[0]))
print("{} Positive ROIs".format(len(roi_positive_ixs)))
# Class counts
print(list(zip(*np.unique(roi_class_names, return_counts=True))))
# Display a random sample of proposals.
# Proposals classified as background are dotted, and
# the rest show their class and confidence score.
limit = 200
ixs = np.random.randint(0, proposals.shape[0], limit)
captions = ["{} {:.3f}".format(dataset.class_names[c], s) if c > 0 else ""
for c, s in zip(roi_class_ids[ixs], roi_scores[ixs])]
visualize.draw_boxes(image, boxes=proposals[ixs],
visibilities=np.where(roi_class_ids[ixs] > 0, 2, 1),
captions=captions, title="ROIs Before Refinement",
ax=get_ax())
# Class-specific bounding box shifts.
roi_bbox_specific = mrcnn["deltas"][0, np.arange(proposals.shape[0]), roi_class_ids]
log("roi_bbox_specific", roi_bbox_specific)
# Apply bounding box transformations
# Shape: [N, (y1, x1, y2, x2)]
refined_proposals = utils.apply_box_deltas(
proposals, roi_bbox_specific * config.BBOX_STD_DEV).astype(np.int32)
log("refined_proposals", refined_proposals)
# Show positive proposals
# ids = np.arange(roi_boxes.shape[0]) # Display all
limit = 5
ids = np.random.randint(0, len(roi_positive_ixs), limit) # Display random sample
captions = ["{} {:.3f}".format(dataset.class_names[c], s) if c > 0 else ""
for c, s in zip(roi_class_ids[roi_positive_ixs][ids], roi_scores[roi_positive_ixs][ids])]
visualize.draw_boxes(image, boxes=proposals[roi_positive_ixs][ids],
refined_boxes=refined_proposals[roi_positive_ixs][ids],
visibilities=np.where(roi_class_ids[roi_positive_ixs][ids] > 0, 1, 0),
captions=captions, title="ROIs After Refinement",
ax=get_ax())
# Remove boxes classified as background
keep = np.where(roi_class_ids > 0)[0]
print("Keep {} detections:\n{}".format(keep.shape[0], keep))
# Remove low confidence detections
keep = np.intersect1d(keep, np.where(roi_scores >= config.DETECTION_MIN_CONFIDENCE)[0])
print("Remove boxes below {} confidence. Keep {}:\n{}".format(
config.DETECTION_MIN_CONFIDENCE, keep.shape[0], keep))
# Apply per-class non-max suppression
pre_nms_boxes = refined_proposals[keep]
pre_nms_scores = roi_scores[keep]
pre_nms_class_ids = roi_class_ids[keep]
nms_keep = []
for class_id in np.unique(pre_nms_class_ids):
# Pick detections of this class
ixs = np.where(pre_nms_class_ids == class_id)[0]
# Apply NMS
class_keep = utils.non_max_suppression(pre_nms_boxes[ixs],
pre_nms_scores[ixs],
config.DETECTION_NMS_THRESHOLD)
# Map indicies
class_keep = keep[ixs[class_keep]]
nms_keep = np.union1d(nms_keep, class_keep)
print("{:22}: {} -> {}".format(dataset.class_names[class_id][:20],
keep[ixs], class_keep))
keep = np.intersect1d(keep, nms_keep).astype(np.int32)
print("\nKept after per-class NMS: {}\n{}".format(keep.shape[0], keep))
# Show final detections
ixs = np.arange(len(keep)) # Display all
# ixs = np.random.randint(0, len(keep), 10) # Display random sample
captions = ["{} {:.3f}".format(dataset.class_names[c], s) if c > 0 else ""
for c, s in zip(roi_class_ids[keep][ixs], roi_scores[keep][ixs])]
visualize.draw_boxes(
image, boxes=proposals[keep][ixs],
refined_boxes=refined_proposals[keep][ixs],
visibilities=np.where(roi_class_ids[keep][ixs] > 0, 1, 0),
captions=captions, title="Detections after NMS",
ax=get_ax())
This stage takes the detections (refined bounding boxes and class IDs) from the previous layer and runs the mask head to generate segmentation masks for every instance.
These are the training targets for the mask branch
display_images(np.transpose(gt_mask, [2, 0, 1]), cmap="Blues")
# Get predictions of mask head
mrcnn = model.run_graph([image], [
("detections", model.keras_model.get_layer("mrcnn_detection").output),
("masks", model.keras_model.get_layer("mrcnn_mask").output),
])
# Get detection class IDs. Trim zero padding.
det_class_ids = mrcnn['detections'][0, :, 4].astype(np.int32)
det_count = np.where(det_class_ids == 0)[0][0]
det_class_ids = det_class_ids[:det_count]
print("{} detections: {}".format(
det_count, np.array(dataset.class_names)[det_class_ids]))
# Masks
det_boxes = utils.denorm_boxes(mrcnn["detections"][0, :, :4], image.shape[:2])
det_mask_specific = np.array([mrcnn["masks"][0, i, :, :, c]
for i, c in enumerate(det_class_ids)])
det_masks = np.array([utils.unmold_mask(m, det_boxes[i], image.shape)
for i, m in enumerate(det_mask_specific)])
log("det_mask_specific", det_mask_specific)
log("det_masks", det_masks)
display_images(det_mask_specific[:4] * 255, cmap="Blues", interpolation="none")
display_images(det_masks[:4] * 255, cmap="Blues", interpolation="none")
In some cases it helps to look at the output from different layers and visualize them to catch issues and odd patterns.
# Get activations of a few sample layers
activations = model.run_graph([image], [
("input_image", tf.identity(model.keras_model.get_layer("input_image").output)),
("res2c_out", model.keras_model.get_layer("res2c_out").output),
("res3c_out", model.keras_model.get_layer("res3c_out").output),
("res4w_out", model.keras_model.get_layer("res4w_out").output), # for resnet100
("rpn_bbox", model.keras_model.get_layer("rpn_bbox").output),
("roi", model.keras_model.get_layer("ROI").output),
])
# Input image (normalized)
_ = plt.imshow(modellib.unmold_image(activations["input_image"][0],config))
# Backbone feature map
display_images(np.transpose(activations["res2c_out"][0,:,:,:4], [2, 0, 1]), cols=4)